Conversation
…(Phase 3.D PR 5b-2)
Stage 2 of PR 5b: wire SendMessage / ReceiveMessage / DeleteMessage
/ ChangeMessageVisibility through the per-key dispatch helpers
landed in PR 5b-1. The section 11 PR 2 dormancy gate still rejects
PartitionCount > 1 at CreateQueue, so production queues remain on
the legacy keyspace and observable behaviour is byte-identical.
PR 5b-3 lifts the gate atomically with the capability check.
Wiring
- SendMessage (sendMessage, sendBatchStandardOnce): partition is
a compile-time 0 (Standard queues reject PartitionCount > 1
via the cross-attribute validator). Dispatch helpers route to
legacy on the steady-state path.
- sendFifoMessage: hashes MessageGroupId once via partitionFor
and threads the result through every key construction (data,
vis, byage, dedup, group-lock).
- ReceiveMessage: scanAndDeliverOnce wraps the scan loop with a
per-partition fanout over effectivePartitionCount(meta)
iterations. The shared wall-clock + per-call max budget caps
apply across the whole call, not per-partition. sqsMsgCandidate
carries the partition field stamped at scan time so downstream
rotate / delete / expire helpers route to the right partition.
- DeleteMessage / ChangeMessageVisibility: handle.Partition from
the v2 receipt handle drives every key construction.
Receipt-handle dispatch
encodeReceiptHandleDispatch(meta, partition, ...) is the single
point that picks v1 vs v2. ReceiveMessage on a partitioned queue
records cand.partition into the handle so DeleteMessage routes
back to the same partition.
decodeClientReceiptHandle is now version-agnostic; the queue-
aware version check moved to validateReceiptHandleVersion which
runs in loadMessageForDelete / loadAndVerifyMessage once meta is
in scope. v1 on a partitioned queue and v2 on a non-partitioned
queue both surface as ReceiptHandleIsInvalid -- preserving the
dormancy promise (no v2 wire-format probability from the public
API) under the new contract. The section 11 PR 2 dormancy gate
still in force in this PR keeps every queue non-partitioned, so
v2 handles still surface as ReceiptHandleIsInvalid downstream,
the exact PR 5a observable behaviour.
Reaper
Reaper iterates legacy byAge keys only -- partition-iterating
enumeration ships in the later partition reaper PR. buildReapOps
/ reapOneRecord pass nil meta + partition 0 through dispatch
helpers so the keys are byte-identical to the pre-PR-5b layout.
Tests (10 added)
Unit tests (sqs_keys_dispatch_v2_test.go):
- TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount:
pins the version dispatch decision across nil/0/1/4 metas.
- TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1:
protects the byte-identical guarantee on legacy queues.
- TestEncodeReceiptHandleDispatch_PerQueueUsesV2: pins that
perQueue + PartitionCount=4 still produces v2 handles
(keyspace is partitioned even when partitionFor collapses
every group to partition 0).
- TestValidateReceiptHandleVersion_QueueAwareRules: 10
sub-cases covering the (meta.PartitionCount x handle.Version)
matrix, including perQueue + PartitionCount=4.
- TestValidateReceiptHandleVersion_NilHandle: defensive nil.
- TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned:
named regression for the dormancy guarantee under the new
contract.
- TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy:
byte-identical to legacy sqsMsgVisScanBounds on legacy meta.
- TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix:
different partitions yield disjoint scan ranges.
- TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace:
pins the PR 731 round 2 forward-note invariant -- perQueue
+ PartitionCount=4 keeps the partitioned vis prefix at
partition 0; collapsing to legacy would silently strand
send writes.
Integration tests (sqs_partitioned_dispatch_test.go) install a
partitioned meta directly on a queue created via the public API,
short-circuiting the dormancy gate without disabling it for
production CreateQueue:
- TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip:
end-to-end smoke test (send 6 groups, receive surfaces all
via fanout, every handle is v2, delete via v2 handle, queue
is empty afterwards, legacy keyspace stays empty).
- TestSQSServer_PartitionedFIFO_RejectsV1Handle: forged v1
handle on a partitioned queue surfaces as ReceiptHandleIsInvalid
via DeleteMessage and ChangeMessageVisibility.
- TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero:
perQueue + PartitionCount=4 receive surfaces every message
in one fanout pass; every v2 handle records Partition=0.
Updated TestDecodeClientReceiptHandle_RejectsV2 to
TestDecodeClientReceiptHandle_AcceptsV2 to reflect the contract
shift (rejection moved from API boundary to meta-aware
validateReceiptHandleVersion).
Self-review (per CLAUDE.md)
1. Data loss -- Dispatch helpers byte-identical on legacy queues
(PartitionCount<=1 routes to legacy constructors). DLQ FIFO
computes dlqPartition via partitionFor(dlqMeta, srcRec.MessageGroupId)
so cross-queue redrive lands in the right partition. Receive
fanout scans every partition. No issue.
2. Concurrency -- Receive fanout iterates partitions sequentially
under one shared wall-clock + max budget, so the original
per-call SLA is preserved. meta is loaded once and passed by
pointer through helpers; no concurrent meta refresh in a
single call. validateReceiptHandleVersion runs after the gen
check so a SetQueueAttributes race cannot flip the answer
(PartitionCount is immutable). go test -race clean.
3. Performance -- Hot path adds 1 nil-check + 1 PartitionCount
compare per dispatch. Receive fanout iterates effectivePartitionCount(meta)
times: 1 on legacy / perQueue, N on perMessageGroupId, with
N partitions splitting roughly 1/N of the messages each.
4. Data consistency -- Receipt handle's recorded partition matches
the storage partition (commitReceiveRotation uses cand.partition
for both newVisKey and the handle). Cross-version handles
rejected as ReceiptHandleIsInvalid -- no leak between keyspaces.
perQueue + PartitionCount=4 invariant pinned by both unit and
integration tests.
5. Test coverage -- 10 new tests across the contract surface.
Existing legacy-path SQS tests pass unchanged.
go test -race ./adapter/... and golangci-lint clean.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughWires partition-aware key and receipt-handle dispatch across SQS flows: send (FIFO and standard), receive (partition fanout with rotated start), delete, change-visibility, redrive, reaper, and batching. Adds v2 receipt-handle encoding/validation for partitioned queues and stamps message candidates with partition routing. ChangesPartition-Aware SQS Dispatch
Sequence DiagramsequenceDiagram
participant Client
participant SQS_Server
participant Partitioner
participant Visibility_Store
participant Data_Store
rect rgba(100, 150, 200, 0.5)
Note over Client,SQS_Server: Partitioned FIFO Send
Client->>SQS_Server: SendMessage(MessageGroupId="group-A")
SQS_Server->>Partitioner: partitionFor(meta, "group-A")
Partitioner-->>SQS_Server: partition = 2
SQS_Server->>Data_Store: Write(sqsMsgDataKeyDispatch(..., partition=2, ...))
SQS_Server->>Visibility_Store: Write(sqsMsgVisKeyDispatch(..., partition=2, ...))
SQS_Server->>SQS_Server: encodeReceiptHandleDispatch(..., partition=2) → v2 handle
SQS_Server-->>Client: SendMessageResponse(ReceiptHandle=v2)
end
rect rgba(200, 100, 150, 0.5)
Note over Client,Visibility_Store: Partitioned Fanout Receive
Client->>SQS_Server: ReceiveMessage(MaxMessages=10)
SQS_Server->>SQS_Server: readTS → startPartitionOffset → partition[0..N)
loop For Each Partition
SQS_Server->>Visibility_Store: Scan(sqsMsgVisScanBoundsDispatch(..., partition, ...))
Visibility_Store-->>SQS_Server: Candidates{partition, visKey, messageID}
end
SQS_Server->>Data_Store: Load(sqsMsgDataKeyDispatch(..., cand.partition, ...))
SQS_Server->>SQS_Server: encodeReceiptHandleDispatch(..., cand.partition) → v2 handle
SQS_Server-->>Client: ReceiveMessageResponse(Messages=[...], ReceiptHandles=[v2])
end
rect rgba(150, 200, 100, 0.5)
Note over Client,Data_Store: Partition-Aware Delete
Client->>SQS_Server: DeleteMessage(ReceiptHandle=v2)
SQS_Server->>SQS_Server: validateReceiptHandleVersion(meta, v2_handle)
SQS_Server->>Data_Store: Delete(sqsMsgDataKeyDispatch(..., v2_handle.partition, ...))
SQS_Server->>Visibility_Store: Delete(sqsMsgVisKeyDispatch(..., v2_handle.partition, ...))
SQS_Server-->>Client: DeleteMessageResponse()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing [Slack Agent](https://www.coderabbit.ai/agent): Turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. 👉 Get your free trial and get 200 agent minutes per Slack user (a $50 value). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
|
@claude review |
|
Claude finished @bootjp's task in 8m 28s —— View job Code Review: feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)
Overall assessmentThis is a well-structured, carefully staged PR. The core abstraction — a single dispatch layer ( Medium:
|
| Path | Partition source | Consistent with send? |
|---|---|---|
sendFifoMessage key construction |
partitionFor(meta, in.MessageGroupId) |
✅ |
commitReceiveRotation vis-key + handle |
cand.partition (stamped at scan) |
✅ |
buildDeleteOps vis/byage/group-lock keys |
handle.Partition (v2 handle) |
✅ |
changeVisibilityWithRetry old/new vis key |
handle.Partition (v2 handle) |
✅ |
buildRedriveOps DLQ keys |
partitionFor(dlqMeta, dlqRec.MessageGroupId) |
✅ |
Reaper (buildReapOps) |
nil meta + partition 0 → legacy only |
✅ (intentional, PR 6 will extend) |
validateReceiptHandleVersion |
version-only check |
Snapshot consistency: readTS is computed once in scanAndDeliverOnce and threaded identically through all partition fanout calls — consistent multi-partition snapshot. ✅
Group-lock partition coherence: partitionFor is deterministic (FNV-1a, power-of-two mask); a given MessageGroupId maps to exactly one partition for the lifetime of a queue generation. The group lock is always written and read at that same partition. ✅
PartitionCount immutability: validatePartitionImmutability is enforced on SetQueueAttributes, making the meta-dispatch pattern race-safe without re-loading meta inside OCC transactions. ✅
Dormancy preservation: Every code path that could emit a v2 handle is gated on meta.PartitionCount > 1, which CreateQueue continues to reject via validatePartitionDormancyGate until PR 5b-3. ✅
Test coverage
10 new tests are well-targeted. Particular highlights:
TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspacedirectly pins the forward-note invariant from PR 731 round 2 (perQueue + partitioned keyspace must scan partitioned prefix, not legacy).TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTripsanity-checks the legacy keyspace is empty after all operations, which is the cleanest possible regression gate for future dispatch regressions.TestDecodeClientReceiptHandle_AcceptsV2correctly documents the contract shift.
Missing: a test that a v2 handle with Partition >= meta.PartitionCount is rejected (tracking the medium finding above).
Verdict: Ready to merge after the partition bounds check is added to validateReceiptHandleVersion and covered by a test. The rest of the implementation is sound.
There was a problem hiding this comment.
Code Review
This pull request implements the data plane wiring for partitioned FIFO queues. It introduces dispatch helpers to route key construction, receipt handle encoding, and version validation based on the queue's partition count. Updates were made to message sending, receiving, deletion, and redriving logic to support these partitions. A review comment pointed out an improvement opportunity in the integration tests to use a more robust helper for calculating prefix scan bounds instead of manually appending bytes.
| // Cap the prefix scan at the generation byte so we do not | ||
| // drag in unrelated queues. | ||
| end := append([]byte(nil), legacyDataPrefix...) | ||
| end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) |
There was a problem hiding this comment.
The logic for creating the end key for the prefix scan is incorrect. Appending 0xFF bytes does not correctly create an exclusive upper bound for all cases, especially when the prefix itself contains 0xFF bytes, which could cause the scan to read into adjacent key ranges (e.g., a different queue generation). This could lead to test flakiness or incorrect test behavior.
A more robust way to create an end key for a prefix is to find the next lexicographical key after the prefix. The codebase already has a helper for this, prefixScanEnd. You should use it here for correctness and consistency.
| // Cap the prefix scan at the generation byte so we do not | |
| // drag in unrelated queues. | |
| end := append([]byte(nil), legacyDataPrefix...) | |
| end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) | |
| // The end key for a prefix scan is the smallest key that does not | |
| // have the prefix. This correctly bounds the scan to only keys | |
| // within the given queue and generation. | |
| end := prefixScanEnd(legacyDataPrefix) |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 443ceb5f9c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for partition := uint32(0); partition < partitions; partition++ { | ||
| if len(delivered) >= opts.Max { | ||
| break |
There was a problem hiding this comment.
Avoid fixed partition-0 priority in receive fanout
When PartitionCount > 1, this loop always starts at partition 0 and exits as soon as opts.Max is filled, so repeated ReceiveMessage calls can permanently starve higher-index partitions whenever partition 0 has a sustained backlog. This only appears on partitioned queues, but once enabled it can make some message groups effectively undeliverable under load because later partitions are never scanned.
Useful? React with 👍 / 👎.
| return errors.New("receipt handle version mismatch for non-partitioned queue") | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Reject out-of-range v2 receipt-handle partitions
The queue-aware version check now accepts any v2 handle on partitioned queues but never validates handle.Partition < meta.PartitionCount. A forged v2 handle with an out-of-range partition is then used to build partitioned keys in delete/change-visibility paths; the partition resolver fails closed for unknown partitions, which bubbles up as a generic internal error instead of ReceiptHandleIsInvalid. This is reachable once partitioned queues are enabled.
Useful? React with 👍 / 👎.
…und 1) Codex P1: with the fanout always starting at partition 0, a sustained backlog on partition 0 would fill opts.Max before the loop reached partition 1 — messages in higher-index partitions were never observed under load. Rotate the starting partition by readTS so consecutive receives spread across every partition. Helper startPartitionOffset folds the upper and lower 32 bits of the HLC-derived readTS and mask-ANDs by partitions-1 (PartitionCount is power-of-two by validator invariant). The byte-slice fold preserves entropy from both HLC halves while keeping the conversion in uint32 space — no //nolint needed for gosec G115. FIFO ordering is unaffected: partitionFor is deterministic, so a MessageGroupId always lands in one partition and cross-partition iteration order does not reorder messages within any group. Caller audit: scanAndDeliverOnce / scanAndDeliverPartition signatures and error semantics are unchanged; only the internal iteration order rotates. New helper has only the one in-package caller.
|
@claude review Round 1 addressing Codex P1: rotate the receive-fanout starting partition by readTS so a sustained backlog on a single partition cannot starve higher-index partitions. Changes:
FIFO ordering: unaffected. Caller audit: P2/medium findings (out-of-range v2 partition validation, prefix-scan helper in test) are intentionally deferred — below the loop's P0/P1/high/critical/major threshold. |
|
Claude finished @bootjp's task in 2m 22s —— View job Code Review: Round 1 — Partition-starvation fix (
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 52c449f5b3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| startOffset := startPartitionOffset(partitions, readTS) | ||
| for i := uint32(0); i < partitions; i++ { |
There was a problem hiding this comment.
Rotate receive start independently from commit-ts stride
Using readTS as the only rotation source can still starve partitions under sustained load, because readTS comes from the last committed timestamp and often advances by a fixed stride (roughly the number of successful per-message rotations in each receive call). With power-of-two partitions, masking this structured value can lock start offsets to a subset (e.g. 4 partitions + MaxNumberOfMessages=10 can alternate between two starts), so if those early partitions stay backlogged, later partitions are never scanned.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
adapter/sqs_partitioned_dispatch_test.go (1)
288-317: ⚡ Quick winPin the single-pass behavior instead of retrying receives.
This loop lets the test pass even if
effectivePartitionCountnever collapses to 1, because repeated receives can still drain partition 0 eventually. If the contract here is “one receive call must surface every message,” make that the assertion directly.Suggested tightening
collected := make(map[string]bool, len(groups)) - for range 4 { - if len(collected) == len(groups) { - break - } - status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ - "QueueUrl": queueURL, - "MaxNumberOfMessages": 10, - "VisibilityTimeout": 60, - }) - require.Equal(t, http.StatusOK, status, "receive: %v", out) - msgs, _ := out["Messages"].([]any) - for _, m := range msgs { - mm, _ := m.(map[string]any) - body, _ := mm["Body"].(string) - collected[body] = true - handle, _ := mm["ReceiptHandle"].(string) - parsed, err := decodeReceiptHandle(handle) - require.NoError(t, err) - require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, - "perQueue + PartitionCount=4 must still produce v2 handles") - require.Equal(t, uint32(0), parsed.Partition, - "perQueue mode pins every group to partition 0, so every handle must record Partition=0") - } - } + status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 10, + "VisibilityTimeout": 60, + }) + require.Equal(t, http.StatusOK, status, "receive: %v", out) + msgs, _ := out["Messages"].([]any) + for _, m := range msgs { + mm, _ := m.(map[string]any) + body, _ := mm["Body"].(string) + collected[body] = true + handle, _ := mm["ReceiptHandle"].(string) + parsed, err := decodeReceiptHandle(handle) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, + "perQueue + PartitionCount=4 must still produce v2 handles") + require.Equal(t, uint32(0), parsed.Partition, + "perQueue mode pins every group to partition 0, so every handle must record Partition=0") + } require.Len(t, collected, len(groups), "perQueue receive must surface every message in one fanout pass over partition 0")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/sqs_partitioned_dispatch_test.go` around lines 288 - 317, The test currently loops multiple receive calls which masks failures by eventually draining partition 0; instead make a single receive call using callSQS(sqsReceiveMessageTarget, ...) and assert that the response contains exactly len(groups) Messages in that one call, then iterate those messages to validate decodeReceiptHandle parsed.Version equals sqsReceiptHandleVersion2 and parsed.Partition == 0; remove the retry loop and fail the test immediately if the single receive doesn't return all messages so the contract “one receive call must surface every message” is enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@adapter/sqs_keys_dispatch.go`:
- Around line 182-195: In validateReceiptHandleVersion, add a bounds check for
v2 handles so that when meta != nil && meta.PartitionCount > 1 and
handle.Version == sqsReceiptHandleVersion2 you also verify handle.Partition <
meta.PartitionCount; if the partition is out of range return the
ReceiptHandleIsInvalid error (or an appropriate error indicating invalid receipt
handle) instead of allowing it to fall through to downstream routing; update the
function logic around validateReceiptHandleVersion to perform this check before
returning nil.
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 91-106: The test currently only asserts parsed.Partition is within
a range which can hide misrouting; instead, after populating sent (the map from
group -> MessageId) reverse that map to look up group by MessageId and assert
that for each received handle the decoded parsed.Partition equals
partitionFor(meta, group). Concretely: after sending messages with callSQS
(using MessageGroupId) and collecting sent[g]=msgID, when you decode handles
check the MessageId from the handle, find the original group via the reversed
sent map, then replace the loose parsed.Partition < 4 assertion with
require.Equal(t, partitionFor(meta, group), parsed.Partition) so each message is
asserted to land on its expected partition.
- Around line 160-191: The current post-delete check re-calls
callSQS(sqsReceiveMessageTarget) immediately and can miss undeleted but
invisible messages (earlier receives used VisibilityTimeout: 60), so a
regression that leaves records invisible would still pass; instead of relying on
immediate receive, either wait > VisibilityTimeout before re-receiving or
(preferred) directly assert the partitioned keyspace is empty by scanning the
partitioned data and visibility prefixes with node.sqsServer.store.ScanAt using
the queue generation from node.sqsServer.loadQueueMetaAt (you already use
sqsMsgDataKey for the legacy prefix—compute the analogous partitioned
data/visibility prefixes for this queue generation and assert ScanAt returns
zero entries), keeping the existing legacy-prefix check for completeness.
---
Nitpick comments:
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 288-317: The test currently loops multiple receive calls which
masks failures by eventually draining partition 0; instead make a single receive
call using callSQS(sqsReceiveMessageTarget, ...) and assert that the response
contains exactly len(groups) Messages in that one call, then iterate those
messages to validate decodeReceiptHandle parsed.Version equals
sqsReceiptHandleVersion2 and parsed.Partition == 0; remove the retry loop and
fail the test immediately if the single receive doesn't return all messages so
the contract “one receive call must surface every message” is enforced.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 467f55af-49a9-40c4-a7bc-c9b223133d6e
📒 Files selected for processing (9)
adapter/sqs_fifo.goadapter/sqs_keys_dispatch.goadapter/sqs_keys_dispatch_v2_test.goadapter/sqs_messages.goadapter/sqs_messages_batch.goadapter/sqs_partitioned_dispatch_test.goadapter/sqs_reaper.goadapter/sqs_receipt_handle_v2_test.goadapter/sqs_redrive.go
…sertions (PR #732, round 2) CodeRabbit P1/Major: validateReceiptHandleVersion accepted any v2 handle on a partitioned queue without checking handle.Partition < meta.PartitionCount. With decodeClientReceiptHandle now accepting v2 on the wire, an out-of-range partition would fall through to sqsMsg*KeyDispatch and depend on downstream routing failure semantics rather than returning ReceiptHandleIsInvalid at the documented choke point. Add the bounds check and surface the same "receipt handle is invalid" public error. Caller audit: both production callers (loadMessageForDelete, loadAndVerifyMessage in sqs_messages.go) already wrap any non-nil return from validateReceiptHandleVersion as sqsErrReceiptHandleInvalid HTTP 400, so the new error case slots into the existing semantic bucket — no caller-side change required. Two test tightenings on PR #732 round 2: - TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip previously asserted only that parsed.Partition < 4. That would still pass if a dispatch regression sent every group to partition 0. Reverse the group->messageID map to messageID->group and require parsed.Partition == partitionFor(meta, group) per message — this is the assertion that actually catches misrouting. - The post-delete emptiness check ran a fresh receive immediately while every prior receive used VisibilityTimeout=60. A regression that turned DeleteMessage into "leave the record invisible but not removed" would still pass under the active visibility window. Probe the partitioned data keyspace directly per partition (which DeleteMessage targets), and also issue the public-API receive after sleeping past a 1s visibility window so any in-flight invisible record would re-expose. Add a regression test (TestValidateReceiptHandleVersion_RejectsOutOfRangePartition) covering several out-of-range partitions and the count-1 boundary.
|
@claude review Round 2 — addressed three CodeRabbit 🟠 Major findings:
Plus a new regression |
|
Claude finished @bootjp's task in 3m 10s —— View job Claude finished @bootjp's task —— Code Review: Round 2 — Three Major Findings Addressed
Overall assessmentAll three 🟠 Major findings are correctly addressed. The implementations are sound, the new regression test is well-targeted, and no new issues were introduced. Two nits remain (carried from prior rounds but explicitly deferred). Ready to merge. Finding 1: Bounds check in
|
| Contract | Before Round 2 | After Round 2 |
|---|---|---|
Out-of-range handle.Partition on partitioned queue |
Falls through to dispatch → downstream routing error | ReceiptHandleIsInvalid at validateReceiptHandleVersion ✅ |
| Partition routing assertion in round-trip test | parsed.Partition < 4 (silent zero-collapse regression) |
parsed.Partition == partitionFor(meta, group) ✅ |
| Post-delete "invisible but not deleted" regression | Undetectable (60s VT still active on re-receive) | Caught by direct data-keyspace scan per partition ✅ |
Verdict: Ready to merge.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 11fbd45b18
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| folded := binary.BigEndian.Uint32(buf[0:4]) ^ binary.BigEndian.Uint32(buf[4:8]) | ||
| return folded & (partitions - 1) |
There was a problem hiding this comment.
Derive fanout start from per-call counter, not readTS bits
This rotation still depends only on masked readTS bits, so it can lock onto a subset of partitions when readTS advances by a structured stride (which is common here because HLC stores a 16-bit logical counter in the low bits and ReceiveMessage often commits a fixed number of per-message transactions). For example, with 4 partitions and a stride of 10 commits per call, folded & (partitions-1) alternates between two starts, so hot early partitions can keep filling MaxNumberOfMessages and starve later partitions. Fresh evidence: the new fold/XOR still ends in & (partitions - 1), which preserves this low-bit stride aliasing.
Useful? React with 👍 / 👎.
| func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { | ||
| key := sqsMsgDedupKey(queueName, gen, dedupID) | ||
| func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { | ||
| key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, dedupID) |
There was a problem hiding this comment.
Include MessageGroupId in partitioned dedup routing
DeduplicationScope=messageGroup requires dedup to be isolated per message group, but this key construction uses only (queue, partition, dedupID). When two different groups hash to the same partition and reuse a dedup ID within the window, the second send is incorrectly treated as a duplicate and dropped/acknowledged with the first message ID. That is a correctness/data-loss issue for partitioned FIFO once the dormancy gate is lifted (and it is already reachable in the partitioned-meta integration path).
Useful? React with 👍 / 👎.
… fanout (PR #732, round 3) Two Codex P1s on commit 11fbd45: 1. Dedup scoping (sqs_fifo.go:95) — On a partitioned queue DeduplicationScope is messageGroup (the validator rejects queue-scope on PartitionCount > 1), but the dedup key was being constructed with only (queue, partition, dedupID). Two distinct MessageGroupIds whose FNV-1a hashes happen to collide onto the same partition would then share a dedup namespace, so a fresh send in group B reusing group A's dedup-id would be silently acked with group A's MessageId — a data-loss outcome. Per the design doc §4.1 line 200, the dedup key keys on (queue, partition, MessageGroupId, dedupID); partition is redundant given the deterministic group→partition map but is kept for keyspace organisation (every key belonging to a group lives under the same partition prefix). Thread groupID through sqsPartitionedMsgDedupKey, sqsMsgDedupKeyDispatch, and loadFifoDedupRecord. Caller audit: loadFifoDedupRecord has exactly one production caller (sendFifoMessage at sqs_fifo.go:186); the dedupKey it returns is reused by the same OCC transaction's ReadKeys + Put list, so the read-guard and the write share one key. sqsMsgDedupKeyDispatch is only reached via loadFifoDedupRecord. Legacy (non-partitioned) keys are unchanged on disk — the legacy branch ignores groupID, preserving the on-disk shape for queues created before partitioning landed. 2. Fanout aliasing (sqs_messages.go:933) — The round-1 fix derived the fanout starting partition from a XOR-fold of readTS bits and masked by (partitions - 1). Codex P1 round 2 flagged a real aliasing concern: HLC packs a 16-bit logical counter in the low bits of readTS and ReceiveMessage commits a fixed number of per-message transactions per call, so consecutive readTS deltas exhibit a structured stride. With partitions=4 and stride=10 the masked fold alternates between two starts (0,2,0,2,…) — partitions 1 and 3 never appear as the start, and opts.Max can fill from those two before the loop reaches the others. Replace the bit-fold with a per-server atomic counter (receiveFanoutCounter on SQSServer). Consecutive partitioned receives walk every partition in strict round-robin regardless of HLC behaviour. The counter is per-server (not Raft-replicated) because fanout distribution is a local-fairness property: every node already sees every message via its own receive scans, and "this node's distribution is balanced" is the only invariant that matters. Add regression tests: - TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId pins that distinct MessageGroupIds on the same (queue, partition, dedupID) produce distinct dedup keys, and that legacy keys are unaffected. - TestNextReceiveFanoutStart_RoundRobin pins that 16 consecutive calls over 4 partitions produce exactly 4 hits per partition (strict round-robin), and that concurrent receivers each get a valid offset under the race detector.
|
@claude review Round 3 — addressed two new Codex 🟠 P1 findings on the round-2 commit: 1. Dedup scoping ( Caller audit: 2. Fanout aliasing ( Two new regression tests:
|
|
Claude finished @bootjp's task in 6m 25s —— View job Code Review: Round 3 — Dedup scoping + Fanout aliasing fixes
Overall assessmentBoth P1 findings from the round-2 Codex review are correctly addressed. The implementations are clean, the regression tests are well-targeted, and no new issues were introduced. Ready to merge. Finding 1: Dedup scoping (
|
| Contract | Before Round 3 | After Round 3 |
|---|---|---|
| Partitioned dedup key scope | (queue, partition, gen, dedupID) — two groups colliding on same partition share dedup namespace |
(queue, partition, gen, groupID, dedupID) — groups have disjoint namespaces ✅ |
| Receive fanout rotation | XOR-fold of readTS bits — aliases under HLC stride patterns (Codex P1) | Per-server atomic counter — true round-robin regardless of HLC ✅ |
| Partition resolver compatibility | N/A | Unaffected — resolver only parses through partition field ✅ |
| Legacy keyspace on-disk compatibility | N/A | Unaffected — legacy branch ignores groupID ✅ |
Verdict: Ready to merge.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ea0cf4ba25
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if partitions <= 1 { | ||
| return 0 | ||
| } | ||
| return s.receiveFanoutCounter.Add(1) & (partitions - 1) |
There was a problem hiding this comment.
Scope fanout rotation counter per queue
Using a single server-wide receiveFanoutCounter lets each queue observe only a strided subsequence of start offsets whenever other queues call ReceiveMessage in between. If that stride shares a factor with PartitionCount (for example, a 4-partition queue that is called every other partitioned receive), the queue’s starts cycle through only a subset of partitions, and with MaxNumberOfMessages filling on earlier scanned partitions, the remaining partitions can be starved indefinitely. This means the global counter can reintroduce the same starvation class the rotation logic is trying to prevent; the rotation state should be tracked per queue (or otherwise keyed by queue identity).
Useful? React with 👍 / 👎.
Codex P1 round 4 on commit ea0cf4b: the round-3 fix used a single server-wide receiveFanoutCounter, but that aliases across queues. When other queues call ReceiveMessage in between, the queue-of-interest observes only a strided subsequence of counter values; if that stride shares a factor with PartitionCount the queue cycles through only a subset of partitions, and with MaxNumberOfMessages filling on the earlier-scanned partitions the rest can be starved indefinitely. Concrete example: 4-partition queue receiving every other tick → its counter values are 1,3,5,7,… and 1,3,5,7 mod 4 = 1,3,1,3 → partitions 0 and 2 never appear as the start. Replace the server-wide atomic.Uint32 with a sync.Map keyed by queue name; each queue owns its own *atomic.Uint32. nextReceiveFanoutStart takes (queueName, partitions) and looks up / installs the per-queue counter. Each queue's rotation now depends solely on its own receive cadence — strict round-robin regardless of cross-queue interleaving. Caller audit: nextReceiveFanoutStart has exactly one production caller (scanAndDeliverOnce at sqs_messages.go:885), which now passes queueName from its existing scope. Return type and range are unchanged ([0, partitions)); no caller-side semantic shift. sync.Map is the right shape: lookups are read-mostly (the same queue keeps getting the same counter) and the keyset grows only with the number of distinct queues this server has handled receives for in-process — bounded by the operator-controlled CreateQueue rate. Update TestNextReceiveFanoutStart_RoundRobin → renamed to TestNextReceiveFanoutStart_PerQueueRoundRobin. New assertions: - queueB and queueC interleaved must each independently see strict round-robin (the round-4 isolation contract); the legacy server-wide counter would have surfaced 1,3,1,3 on queueB → fails. - Concurrent receivers spread across multiple queue names exercise the sync.Map LoadOrStore path under -race so any missed synchronisation on the insert is caught.
|
@claude review Round 4 — addressed Codex 🟠 P1 round 4: per-queue fanout counter. The round-3 server-wide counter aliased across queues — when queues interleave with strides that share a factor with Replaced Caller audit: Test:
|
|
Claude finished @bootjp's task in 2m 5s —— View job Code Review: Round 4 — Per-queue fanout counter
Overall assessmentThe fix is correct and the right abstraction for this problem. The Correctness:
|
| Contract | Before Round 4 | After Round 4 |
|---|---|---|
| Fanout rotation scoping | Single server-wide atomic.Uint32 — aliases when queues interleave with strides sharing a factor with PartitionCount |
Per-queue *atomic.Uint32 in sync.Map — rotation depends only on each queue's own receive cadence ✅ |
| Cross-queue isolation | Broken: 4-partition queueB receiving every other tick → 1,3,1,3 → partitions 0,2 starved | Fixed: queueB's counter increments only on queueB receives ✅ |
| Non-partitioned queues | Unchanged: returns 0, no map touch | Unchanged ✅ |
| Race safety | atomic.Uint32.Add — safe |
sync.Map.Load/LoadOrStore + atomic.Uint32.Add — safe ✅ |
Verdict: Ready to merge.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 667dea9c37
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } | ||
| v, ok := s.receiveFanoutCounters.Load(queueName) | ||
| if !ok { | ||
| v, _ = s.receiveFanoutCounters.LoadOrStore(queueName, &atomic.Uint32{}) |
There was a problem hiding this comment.
Evict stale fanout counters when queues are removed
nextReceiveFanoutStart creates and retains a receiveFanoutCounters entry for every queue name ever received from, but there is no corresponding cleanup on DeleteQueue/recreate paths, so repeated create/delete of unique queue names causes unbounded in-memory growth over process lifetime. This is a new long-lived state path introduced by the per-queue counter and can become a memory/perf issue in multi-tenant or high-churn environments; the delete path currently invalidates throttle state only, not this map.
Useful? React with 👍 / 👎.
… round 5)
Codex P2 (user-elevated): nextReceiveFanoutStart populates a sync.Map
entry per queue name on the first partitioned receive, but DeleteQueue
never removes it. Repeated create/delete of unique queue names then
leaks one entry per name for the lifetime of the process — a real
problem for multi-tenant or high-churn deployments.
Mirror throttle.invalidateQueue's two call sites:
- createQueue genuine-create branch — drops a counter that survived
a delete-then-create race (a partitioned receive holding pre-delete
meta could repopulate the entry between the delete-side cleanup
and this create commit). Tied to the same "fresh state on a true
create" guarantee the throttle invalidate provides.
- deleteQueue HTTP handler — drops the counter alongside the
throttle bucket so DeleteQueue+CreateQueue resets every piece of
in-memory queue state, not just the rate-limit bucket.
setQueueAttributes is intentionally NOT mirrored: it only mutates
throttle config, never PartitionCount, so the counter remains valid
for the same queue across attribute changes — same conditional shape
the throttle invalidate already uses there.
Caller audit: the new dropReceiveFanoutCounter helper has exactly two
production callers, both adjacent to the existing throttle.invalidateQueue
calls. AdminDeleteQueue does not call throttle.invalidateQueue today
either; staying consistent rather than fixing that pre-existing
inconsistency in this round.
Test: TestDropReceiveFanoutCounter_ClearsEntry pins
- no-op on never-touched queues (must not allocate or panic),
- removal after a populated counter,
- per-queue isolation (dropping queueB leaves queueC's counter
intact),
- fresh allocation after drop (the recreated queue starts from a
zero counter, not a recycled pointer — pinned by asserting the
first offset == 1 on a 4-partition queue).
|
@claude review Round 5 — addressed Codex P2 (user-elevated): per-queue fanout counter eviction. The round-4 per-queue Fix: new
Caller audit: new helper has exactly two production callers, both adjacent to existing Test:
Build, |
|
Claude finished @bootjp's task in 1m 53s —— View job Code Review: Round 5 — Per-queue fanout counter eviction
Overall assessmentThe fix is correct, minimal, and faithful to the pattern it mirrors. Implementation
func (s *SQSServer) dropReceiveFanoutCounter(queueName string) {
s.receiveFanoutCounters.Delete(queueName)
}
Race analysis: Caller audit
The Test:
|
| Assertion | What regression it catches |
|---|---|
| No entry for "never-touched" after drop | Delete allocates a zero entry — would break the memory-leak fix |
Entry present after nextReceiveFanoutStart |
No-op LoadOrStore regression |
| Entry absent after drop | The core leak: Delete silently failing |
| Per-queue isolation (B removed, C intact) | Delete clearing the whole map or wrong key |
first == 1 after drop + re-populate |
Old pointer recycled: Delete didn't actually remove the entry |
The first == 1 assertion is particularly tight: it would fail if Delete removed the key but a concurrent goroutine's LoadOrStore returned the old pointer (impossible with sync.Map semantics, but the test proves it). ✅
Minor observations (non-blocking)
1. PR reference in source comment (sqs_catalog.go:1051)
// — a leak in multi-tenant / high-churn deployments. Codex P2,
// PR #732 round 5.Per CLAUDE.md: "Don't reference the current task, fix, or callers... since those belong in the PR description and rot as the codebase evolves." The "Codex P2, PR #732 round 5" attribution belongs in the commit message (where it already is, verbatim). The comment at the createQueue call site does not have this issue. Minor style nit; the deleteQueue comment reads naturally without it.
2. dropReceiveFanoutCounter doc block length
The function comment is 9 lines. CLAUDE.md convention is "one short line max" — the non-obvious WHY fits in two lines: the leak scenario + the no-op guarantee. The rest is reachable from the commit message. Not new to Round 5 (Rounds 3–4 had the same issue on nextReceiveFanoutStart) and not blocking.
Correctness table (Round 5)
| Contract | Before Round 5 | After Round 5 |
|---|---|---|
receiveFanoutCounters entry lifetime |
Retained for process lifetime per queue name ever served | Evicted on deleteQueue and genuine createQueue ✅ |
| Multi-tenant / high-churn leak | One entry per unique queue name leaked for process lifetime | Bounded to currently-alive queues ✅ |
| Delete-then-create race | Stale counter from pre-delete incarnation survives into new queue | Cleaned up at genuine-create path ✅ |
AdminDeleteQueue eviction |
Not performed (pre-existing, throttle.invalidateQueue also absent) |
Not performed (intentionally consistent) |
Verdict: Ready to merge.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
adapter/sqs_messages.go (1)
865-885: ⚡ Quick winUpdate this comment to describe the counter-based rotation.
The code now uses
nextReceiveFanoutStart(queueName, partitions)andreceiveFanoutCounters, but this block still explains the removed readTS/HLC selector and "no per-server state". That mismatch will mislead the next refactor, especially around the create/delete counter cleanup.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/sqs_messages.go` around lines 865 - 885, Update the comment block explaining partition rotation to reflect the new counter-based rotation: replace references to readTS/HLC and "no per-server state" with a description that rotation is driven by per-queue counters accessed via nextReceiveFanoutStart(queueName, partitions) and stored in receiveFanoutCounters, how effectivePartitionCount(meta) and startOffset are derived, and that this introduces per-queue counter state which must be cleaned up on queue create/delete; also clarify that FIFO/MessageGroupId semantics remain unchanged and note any implications for create/delete counter cleanup and concurrency.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@adapter/sqs_keys.go`:
- Around line 326-334: The suffix built by sqsPartitionedMsgDedupKey is
ambiguous because encodeSQSSegment(groupID) and encodeSQSSegment(dedupID) are
appended back-to-back; fix by inserting the same segment terminator used
elsewhere (e.g., sqsPartitionedQueueTerminator) between the two encoded segments
so the boundary is explicit. Modify sqsPartitionedMsgDedupKey to append
encodeSQSSegment(groupID), then append the terminator (the same symbol used
after queueName), then append encodeSQSSegment(dedupID) so (groupID,dedupID)
pairs are unambiguous.
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 200-212: The post-delete sleep + receive block falsely claims it
will catch "leave-invisible" regressions because the drain loop sets
VisibilityTimeout: 60, so sleeping 1100ms won't re-expose messages left
invisible by a failed DeleteMessage; fix by either lowering the drain loop's
VisibilityTimeout to 1 (so the 1100ms sleep will expire it) or update the
comment to state this receive does NOT cover drain-phase "leave-invisible"
regressions and that the keyspace scan is the real guard—change references
around the drain loop and the receive call (callSQS with sqsReceiveMessageTarget
and the VisibilityTimeout param) accordingly.
---
Nitpick comments:
In `@adapter/sqs_messages.go`:
- Around line 865-885: Update the comment block explaining partition rotation to
reflect the new counter-based rotation: replace references to readTS/HLC and "no
per-server state" with a description that rotation is driven by per-queue
counters accessed via nextReceiveFanoutStart(queueName, partitions) and stored
in receiveFanoutCounters, how effectivePartitionCount(meta) and startOffset are
derived, and that this introduces per-queue counter state which must be cleaned
up on queue create/delete; also clarify that FIFO/MessageGroupId semantics
remain unchanged and note any implications for create/delete counter cleanup and
concurrency.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4d4c114c-4ddf-4764-b8b9-de5b92b9e0f3
📒 Files selected for processing (10)
adapter/sqs.goadapter/sqs_catalog.goadapter/sqs_fifo.goadapter/sqs_keys.goadapter/sqs_keys_dispatch.goadapter/sqs_keys_dispatch_test.goadapter/sqs_keys_test.goadapter/sqs_messages.goadapter/sqs_partition_resolver_test.goadapter/sqs_partitioned_dispatch_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- adapter/sqs_fifo.go
| // And the public API must agree: a fresh receive with a short | ||
| // visibility timeout (after sleeping past it, so any in-flight | ||
| // invisible record would re-expose) returns no messages. | ||
| time.Sleep(1100 * time.Millisecond) | ||
| status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ | ||
| "QueueUrl": queueURL, | ||
| "MaxNumberOfMessages": 10, | ||
| "VisibilityTimeout": 1, | ||
| }) | ||
| require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out) | ||
| if msgs, _ := out["Messages"].([]any); len(msgs) > 0 { | ||
| t.Fatalf("expected empty queue after delete; got %d messages", len(msgs)) | ||
| } |
There was a problem hiding this comment.
Post-delete API check cannot catch "leave-invisible" regressions from the drain phase.
The comment claims sleeping 1 100 ms ensures "any in-flight invisible record would re-expose," but the drain loop (lines 137–141) uses VisibilityTimeout: 60. A message left invisible by a failing DeleteMessage would still have ~58.9 s of visibility remaining after 1 100 ms and would not be returned — so the require on line 211 would silently pass even if deletion regressed to a no-op.
The keyspace scan (lines 186–198) is the only correct guard for that class of bug. The comment incorrectly implies this block provides equivalent coverage; a future maintainer who removes the keyspace scan while trusting this comment would lose the protection.
Two clean fixes:
🛠️ Option A — lower the drain VTO so the sleep does expire it
- "VisibilityTimeout": 60,
+ "VisibilityTimeout": 1,The 1 100 ms sleep then expires the 1 s window, and any invisible-but-not-deleted record would reappear in the final receive.
📝 Option B — correct the comment to reflect what this block actually tests
- // And the public API must agree: a fresh receive with a short
- // visibility timeout (after sleeping past it, so any in-flight
- // invisible record would re-expose) returns no messages.
+ // Confirm the public API also observes an empty queue. Note: the
+ // drain receives used VisibilityTimeout=60, so records left invisible
+ // by a regressed DeleteMessage would NOT re-expose here. The
+ // keyspace scan above is the definitive guard for that regression;
+ // this receive is a belt-and-suspenders check for completely unrelated
+ // messages only.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@adapter/sqs_partitioned_dispatch_test.go` around lines 200 - 212, The
post-delete sleep + receive block falsely claims it will catch "leave-invisible"
regressions because the drain loop sets VisibilityTimeout: 60, so sleeping
1100ms won't re-expose messages left invisible by a failed DeleteMessage; fix by
either lowering the drain loop's VisibilityTimeout to 1 (so the 1100ms sleep
will expire it) or update the comment to state this receive does NOT cover
drain-phase "leave-invisible" regressions and that the keyspace scan is the real
guard—change references around the drain loop and the receive call (callSQS with
sqsReceiveMessageTarget and the VisibilityTimeout param) accordingly.
… round 6)
CodeRabbit major: sqsPartitionedMsgDedupKey appends encodeSQSSegment(groupID)
and encodeSQSSegment(dedupID) back-to-back. encodeSQSSegment uses
base64.RawURLEncoding (no padding, alphabet [A-Za-z0-9_-]), so the boundary
between the two segments is variable-length and ambiguous: distinct
(groupID, dedupID) pairs can collapse onto the same byte sequence — most
trivially when one of the two is empty: ("", "abcd") and ("abcd", "") both
produce "...QUJDRA". Even for non-empty IDs the boundary depends on input
length mod 3 in subtle ways. This is the on-disk dedup-key format PR 5b-3
will rely on once the dormancy gate lifts; fixing it now avoids shipping
an ambiguous format.
Fix: insert sqsPartitionedQueueTerminator pipe-byte between the two segments.
That byte is outside RawURLEncoding's alphabet so neither segment can
contribute one of its own — the boundary becomes unambiguous regardless of
input length. Mirrors how sqsPartitionedMsgDataKey, MsgVisKey, MsgGroupKey,
and MsgByAgeKey already use the same terminator at the queue/partition
boundary.
Caller audit: sqsPartitionedMsgDedupKey is reached only via
sqsMsgDedupKeyDispatch, which has exactly two production callers —
loadFifoDedupRecord (read-side) and sendFifoMessage (write-side, via the
key returned from the same loadFifoDedupRecord call). Read and write share
one helper invocation, so the new format takes effect symmetrically with
no read/write skew window. parsePartitionedSQSKey only inspects the key
through the partition field and stops; the new terminator after groupID
does not affect parsing.
Note on dormancy: partitioned queues are still gated at CreateQueue
(§11 PR 2 dormancy gate, lifted atomically with the capability check in
PR 5b-3), so no production data carries the old format. This is purely
a forward-format correction.
Test: TestSqsPartitionedMsgDedupKey_GroupDedupSeparator covers
- the empty-segment collision class (the most blatant ambiguity),
- the new terminator's exact placement (key ends with the
b64(group)+pipe+b64(dedup) suffix, proving the byte sits at the boundary),
- non-empty pair non-collision (("ab", "cd") vs ("abcd", "")),
- read-write symmetry through sqsMsgDedupKeyDispatch (the dispatch
helper produces the same bytes as the underlying constructor, so
a regression that diverged the two would fail here).
|
@claude review Round 6 — addressed CodeRabbit 🟠 Major: dedup key segment-boundary ambiguity.
Fix: insert Caller audit: Dormancy: partitioned queues are still gated at CreateQueue (§11 PR 2, lifted atomically with the capability check in PR 5b-3), so no production data carries the old format. Pure forward-format correction. Test (
Build, |
|
Claude finished @bootjp's task in 2m 30s —— View job Code Review: Round 6 — Dedup key segment-boundary fix
Overall assessmentThe Round 6 fix is correct, minimal, and the right shape. The Correctness:
|
| Assertion | What regression it catches |
|---|---|
("", "abcd") ≠ ("abcd", "") |
Empty-segment collision — the most trivial ambiguity; fails without the terminator |
| `bytes.HasSuffix(key, b64(groupA)+" | "+b64(dedup-token))` |
("ab", "cd") ≠ ("abcd", "") |
Length-aligned non-empty collision — covers the length-mod-3 boundary cases |
sqsMsgDedupKeyDispatch(meta, ...) == sqsPartitionedMsgDedupKey(...) |
Read-write symmetry via dispatch helper — prevents dispatch helper diverging from underlying constructor |
The existing TestSQSDedupKeyDispatch_PartitionedScopesByMessageGroupId (from Round 3) remains in place and still passes, providing orthogonal coverage: it asserts that two distinct groupID values on the same (queue, partition, dedupID) produce different keys. ✅
Minor observations (not blocking)
1. Comment block length and PR reference (sqs_keys.go:334–342)
The 9-line inline comment includes "CodeRabbit major, PR #732 round 6". Per CLAUDE.md: "one short line max" and "Don't reference the current task, fix, or callers... since those belong in the PR description and rot as the codebase evolves." Same pre-existing pattern as rounds 3–5; not introduced by Round 6 and not blocking.
2. Deferred 🟡 Minor: misleading post-delete comment (sqs_partitioned_dispatch_test.go:200–202)
Explicitly deferred by the trigger comment ("below the loop threshold"). The claim that sleeping 1100ms will re-expose drain-phase invisible messages is incorrect (drain uses VTO=60), but the direct keyspace scan at lines 183-198 is the definitive correctness guard — the comment overstates the API receive's coverage. Not blocking; acknowledged.
Correctness table (Round 6)
| Contract | Before Round 6 | After Round 6 |
|---|---|---|
(groupID, dedupID) → unique dedup key |
Ambiguous: variable-length segments back-to-back, ("", "X") collides with ("X", "") |
Unambiguous: ` |
parsePartitionedSQSKey |
Stops at partition field | Unaffected — new terminator is past its stopping point ✅ |
| Legacy keyspace | Unchanged | Unchanged ✅ |
| On-disk format for existing queues | N/A (partitioned queues gated at CreateQueue) | N/A — dormancy gate holds ✅ |
Verdict: Ready to merge.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5047223acc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // dedup key so that two groups colliding onto the same partition keep | ||
| // disjoint dedup namespaces (the AWS messageGroup-scope contract). | ||
| func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) { | ||
| key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, groupID, dedupID) |
There was a problem hiding this comment.
Reap partitioned dedup records to prevent key growth
loadFifoDedupRecord now resolves dedup keys via sqsMsgDedupKeyDispatch, which routes PartitionCount > 1 queues to the partitioned dedup prefix, but the dedup reaper still scans only the legacy SqsMsgDedupPrefix + encodeSQSSegment(queueName) range in adapter/sqs_reaper.go. Once partitioned FIFO is enabled, expired dedup entries for those queues will never be swept, so high-cardinality MessageDeduplicationId traffic causes unbounded dedup-key accumulation and steadily worse storage/scan overhead.
Useful? React with 👍 / 👎.
Summary
Stage 2 of Phase 3.D PR 5b: wire
SendMessage/ReceiveMessage/DeleteMessage/ChangeMessageVisibilitythrough the per-key dispatch helpers landed in PR 5b-1 (#731). The §11 PR 2 dormancy gate still rejectsPartitionCount > 1atCreateQueue, so production queues stay on the legacy keyspace and observable behaviour is byte-identical. PR 5b-3 lifts the gate atomically with the capability check.What's wired
Send paths
sendMessage,sendBatchStandardOnce: partition is a compile-time0(Standard queues rejectPartitionCount > 1via the cross-attribute validator). Dispatch helpers route to the legacy keyspace on the steady-state path.sendFifoMessage: hashesMessageGroupIdonce viapartitionForand threads the result through every key construction (data, vis, byage, dedup, group-lock).Receive fanout
scanAndDeliverOncewraps the scan loop with a per-partition fanout overeffectivePartitionCount(meta)iterations. The shared wall-clock + per-callMaxbudget caps apply across the whole call, not per-partition.sqsMsgCandidatecarries a newpartitionfield stamped at scan time, soloadCandidateRecord/expireMessage/commitReceiveRotation/classifyFifoGroupLockall route to the partition the message was originally stored under.Delete + ChangeMessageVisibility
handle.Partitionfrom the v2 receipt handle drives every key construction.loadMessageForDelete/loadAndVerifyMessageinvoke the newvalidateReceiptHandleVersiononce meta is in scope.Receipt-handle dispatch (the contract shift)
encodeReceiptHandleDispatch(meta, partition, ...)is the single point that picks v1 vs v2.decodeClientReceiptHandleis now version-agnostic; the queue-aware version check moved tovalidateReceiptHandleVersion. v1-on-partitioned and v2-on-non-partitioned both surface asReceiptHandleIsInvalid, preserving the dormancy promise (no v2 wire-format probability from the public API) under the new contract. Since the §11 PR 2 dormancy gate keeps every production queue non-partitioned in this PR, every v2 handle still surfaces asReceiptHandleIsInvaliddownstream — exactly the PR 5a observable behaviour.Reaper
The reaper iterates legacy byAge keys only — partition-iterating enumeration ships in the later partition-reaper PR.
buildReapOps/reapOneRecordpassnilmeta +partition 0through dispatch helpers so the keys are byte-identical to the pre-PR-5b layout.What does NOT change yet
CreateQueuestill rejectsPartitionCount > 1withInvalidAttributeValue(the §11 PR 2 dormancy gate, lifted atomically in PR 5b-3).Test plan
10 new tests across the contract surface.
Unit tests (
adapter/sqs_keys_dispatch_v2_test.go):TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount— pins the version dispatch decision acrossnil/0/1/4metas.TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1— protects the byte-identical guarantee on legacy queues.TestEncodeReceiptHandleDispatch_PerQueueUsesV2—perQueue + PartitionCount=4still produces v2 handles (keyspace is partitioned even whenpartitionForcollapses every group to partition 0).TestValidateReceiptHandleVersion_QueueAwareRules— 10 sub-cases covering the (meta.PartitionCount×handle.Version) matrix, includingperQueue + PartitionCount=4.TestValidateReceiptHandleVersion_NilHandle— defensive nil branch.TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned— named regression for the dormancy guarantee under the new contract.TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy— byte-identical to legacysqsMsgVisScanBoundson legacy meta.TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix— different partitions yield disjoint scan ranges.TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace— pins the PR feat(sqs): per-key dispatch helpers for partitioned-FIFO routing (Phase 3.D PR 5b-1) #731 round 2 forward-note invariant:perQueue + PartitionCount=4keeps the partitioned vis prefix at partition 0; collapsing to legacy would silently strand send writes.Integration tests (
adapter/sqs_partitioned_dispatch_test.go) install a partitioned meta directly on a queue created via the public API, short-circuiting the dormancy gate without disabling it for productionCreateQueue:TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip— end-to-end smoke test: send 6 groups, receive surfaces all via fanout, every handle is v2, delete via v2 handle, queue is empty afterwards, legacy keyspace stays empty.TestSQSServer_PartitionedFIFO_RejectsV1Handle— forged v1 handle on a partitioned queue surfaces asReceiptHandleIsInvalidviaDeleteMessageandChangeMessageVisibility.TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero—perQueue + PartitionCount=4receive surfaces every message in one fanout pass; every v2 handle recordsPartition=0.Updated
TestDecodeClientReceiptHandle_RejectsV2→TestDecodeClientReceiptHandle_AcceptsV2to reflect the contract shift (rejection moved from API boundary to meta-awarevalidateReceiptHandleVersion).go test -race ./adapter/...(targeted SQS scope) clean.golangci-lint run ./adapter/...clean.Self-review (per CLAUDE.md)
PartitionCount<=1routes to legacy constructors). DLQ FIFO computesdlqPartitionviapartitionFor(dlqMeta, srcRec.MessageGroupId)so cross-queue redrive lands in the right partition. Receive fanout scans every partition. No issue.metais loaded once and passed by pointer through helpers; no concurrent meta refresh in a single call.validateReceiptHandleVersionruns after the gen check so aSetQueueAttributesrace cannot flip the answer (PartitionCountis immutable).go test -raceclean.PartitionCountcompare per dispatch. Receive fanout iterateseffectivePartitionCount(meta)times: 1 on legacy /perQueue, N onperMessageGroupIdwith N partitions splitting roughly 1/N of the messages each.commitReceiveRotationusescand.partitionfor bothnewVisKeyand the handle). Cross-version handles rejected asReceiptHandleIsInvalid— no leak between keyspaces.perQueue + PartitionCount=4invariant pinned by both unit and integration tests.Caller audit (semantic-change discipline)
decodeClientReceiptHandlesemantics changed from "reject all v2" to "decode any version, defer version validation to meta-aware caller". All 3 production callers audited via grep:parseQueueAndReceipt(sqs_messages.go:1576) →deleteMessageWithRetry→loadMessageForDelete→validateReceiptHandleVersionafter meta load. ✅DeleteMessageBatchentry (sqs_messages_batch.go:475) →deleteMessageWithRetry→ same as above. ✅ChangeMessageVisibilityBatchentry (sqs_messages_batch.go:575) →changeVisibilityWithRetry→loadAndVerifyMessage→validateReceiptHandleVersion. ✅The dormancy guarantee (v2 →
ReceiptHandleIsInvalidon non-partitioned) is preserved across every caller path.Follow-ups
CreateQueuecapability check viaPollSQSHTFIFOCapability(feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a) #721). Removes thevalidatePartitionDormancyGatecall and verifies cluster-wide HT-FIFO support before acceptingPartitionCount > 1.buildReapOps).Summary by CodeRabbit
New Features
Bug Fixes
Tests